[SPARK-40193][SQL] Merge subplans with different filter conditions#55298
[SPARK-40193][SQL] Merge subplans with different filter conditions#55298peter-toth wants to merge 1 commit intoapache:masterfrom
Conversation
219b19f to
66f7683
Compare
### What changes were proposed in this pull request?
`PlanMerger` is extended to merge non-correlated non-grouping aggregate subplans
that differ only in their `WHERE` filter conditions.
Filter merging follows the same recursive plan-matching logic as the rest of
`PlanMerger` and handles three cases:
- `(np: Filter, cp: Filter)` with different conditions: both conditions are
aliased as boolean attributes in a `Project`, a merged `Filter(OR(f0, f1))`
is introduced, and the aliases are propagated up to the enclosing `Aggregate`
where each side's expressions receive a `FILTER (WHERE ...)` clause.
- `(np: Filter, cp)` or `(np, cp: Filter)`: only one side has a filter; the
condition is exposed as a `Project` attribute and propagated up so only that
side's aggregate expressions receive a `FILTER` clause.
- Equal filter conditions pass through unchanged.
When plans also differ in intermediate `Project` expressions above a `Filter`,
those expressions are wrapped with `If(filterAttr, expr, null)` to avoid
computing them for rows that do not match that side's filter condition. Plain
attribute references are never wrapped since reading a column value is free.
**Example**
```
// Input plans
Aggregate [sum(a) AS sum_a] Aggregate [max(d) AS max_d]
+- Filter (a < 1) +- Project [udf(a) AS d]
+- Scan t +- Filter (a > 1)
+- Scan t
// Merged plan
Aggregate [sum(a) FILTER f0 AS sum_a, max(d0) FILTER f1 AS max_d]
+- Project [a, If(f1, udf(a), null) AS d0, f0, f1]
+- Filter (f0 OR f1)
+- Project [a, (a < 1) AS f0, (a > 1) AS f1]
+- Scan t
```
**Benefit**: a single scan of `t` computes both aggregates, which is typically
cheaper than two separate scans. The `If` wrapping ensures `udf(a)` is only
evaluated for rows that match `a > 1`.
**Drawback** (symmetric case only): the merged `Filter(f0 OR f1)` is less
selective than each individual filter, which may reduce IO pruning such as
partition or file skipping. On heavily partitioned or file-pruned tables the
extra IO can outweigh the scan-deduplication benefit. The asymmetric case
(`(np: Filter, cp)`) is always beneficial because the unfiltered side would
have read all the data anyway.
**Configs**:
- `spark.sql.planMerge.filterPropagation.enabled` (internal, default `true`):
master switch; disabling it turns off all filter-based merging.
- `spark.sql.planMerge.symmetricFilterPropagation.enabled` (default `true`):
controls the symmetric `(Filter, Filter)` case specifically, so users on
IO-pruning-sensitive workloads can disable only that path while keeping the
always-beneficial asymmetric merging.
`MergeResult.outputMap` is changed from `AttributeMap[Attribute]` to
`AttributeMap[Int]`, mapping each input plan attribute to its positional index
in the merged output. Positional indices remain stable across subsequent
`PlanMerger.merge` calls (outputs are only ever appended), whereas retained
`Attribute` values can become stale when filter merging replaces expressions
with new aliases. This also simplifies the two call sites in `MergeSubplans`.
### Why are the changes needed?
Computing aggregates over the same table with different `WHERE` clauses is a
common analytical pattern (e.g. conditional sums or counts for different
predicates). Without this change each subquery forces a separate full scan;
merging them reduces scan count and overall query cost.
### Does this PR introduce _any_ user-facing change?
Yes. A new config `spark.sql.planMerge.symmetricFilterPropagation.enabled`
(default `true`) is added. The optimization is otherwise transparent: queries
produce the same results, and both configs can be set to `false` to restore
the previous behavior.
### How was this patch tested?
New unit tests in `MergeSubplansSuite` and new end-to-end tests in
`PlanMergeSuite` covering the basic two-subplan cases, three-subplan merging,
disabled configs, grouping aggregates (not merged), asymmetric filters, stacked
filters, and reversed filter ordering.
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6 (Anthropic)
66f7683 to
5f95716
Compare
|
I measured the following improvements with the affected queries: |
|
@LuciferYang this is now ready for review. |
|
Thanks for pinging me. I will provide feedback tomorrow. |
|
cc @cloud-fan, @dongjoon-hyun, @yaooqinn as well |
LuciferYang
left a comment
There was a problem hiding this comment.
Overall it looks good.
| val newNPCondition = npFilter.fold(mappedNPCondition) { | ||
| case (f, _) => And(f, mappedNPCondition) | ||
| } | ||
| val childProject = mergedChild.asInstanceOf[Project] |
There was a problem hiding this comment.
nit: mergedChild.asInstanceOf[Project] will throw ClassCastException if a future change breaks the invariant that a MERGED_FILTER_TAG filter always has a Project child. Consider a pattern match with an explicit error so the failure mode is clear:
val childProject = mergedChild match {
case p: Project => p
case other => throw SomeException
}| .createOptional | ||
|
|
||
| val PLAN_MERGE_FILTER_PROPAGATION_ENABLED = | ||
| buildConf("spark.sql.planMerge.filterPropagation.enabled") |
There was a problem hiding this comment.
The spark.sql.planMerge.* namespace is new — no other configs use it. Should this live under spark.sql.optimizer.* for discoverability, alongside spark.sql.optimizer.runtime.bloomFilter.enabled and similar? E.g. spark.sql.optimizer.mergeSubplans.filterPropagation.enabled, which also matches the rule name MergeSubplans.
| .version("4.2.0") | ||
| .withBindingPolicy(ConfigBindingPolicy.SESSION) | ||
| .booleanConf | ||
| .createWithDefault(true) |
There was a problem hiding this comment.
The symmetric case broadens the combined filter to OR(f1, f2), which can reduce IO pruning on partitioned or file-pruned tables (as the doc itself notes). For a brand-new optimization shipped enabled-by-default, this can cause silent regressions on upgrade. Should this default to false for the first release, with users opting in once the behavior is validated in production? The asymmetric case (always beneficial) can keep its default.
| comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer.analyze) | ||
| } | ||
|
|
||
| test("SPARK-40193: Merge non-grouping subqueries with different filter conditions") { |
There was a problem hiding this comment.
Could you add three more tests to lock down the contract on the following paths?
-
Distinct aggregates: a test using
countDistinct($"a")(or anyisDistinct = trueaggregate) on each side, asserting either correct merge with FILTER clauses or correct rejection. Without it, the supported/unsupported boundary insupportedAggregateMergeis implicit. -
Ifwrapping with computed expressions: a test where each side has aProjectbetweenFilterandAggregatecontaining a non-attribute expression (e.g.udf($"a") AS dor($"a" + 1) AS d). The PlanMerger scaladoc example (around line 100 inPlanMerger.scala) shows this scenario, but no test currently exercises theIf(filterAttr, expr, null)wrapping branch inmergeNamedExpressionsfor non-attribute expressions. -
Pre-existing FILTER on aggregate expressions: a test where one side's aggregate already carries a FILTER clause (e.g.
count($"a") FILTER (WHERE $"b" > 0)).applyFilterToAggregateExpressionscombines viaAnd(propagatedFilter, existingFilter)but no test covers this combination.
| if (cp.getTagValue(PlanMerger.MERGED_FILTER_TAG).isDefined) { | ||
| // cp Filter is already a merged filter from a previous round: its condition |
| Some((newNPFilter, true)), None)) | ||
| } | ||
| } else { | ||
| // First-time filter propagation: alias both sides' conditions as boolean |
What changes were proposed in this pull request?
PlanMergeris extended to merge non-correlated non-grouping aggregate subplans that differ only in theirWHEREfilter conditions.Filter merging follows the same recursive plan-matching logic as the rest of
PlanMergerand handles three cases:(np: Filter, cp: Filter)with different conditions: both conditions are aliased as boolean attributes in aProject, a mergedFilter(OR(f0, f1))is introduced, and the aliases are propagated up to the enclosingAggregatewhere each side's expressions receive aFILTER (WHERE ...)clause.(np: Filter, cp)or(np, cp: Filter): only one side has a filter; the condition is exposed as aProjectattribute and propagated up so only that side's aggregate expressions receive aFILTERclause.When plans also differ in intermediate
Projectexpressions above aFilter, those expressions are wrapped withIf(filterAttr, expr, null)to avoid computing them for rows that do not match that side's filter condition.Example
Benefit: a single scan of
tcomputes both aggregates, which is typically cheaper than two separate scans.Drawback (symmetric case only): the merged
Filter(f0 OR f1)is less selective than each individual filter, which may reduce IO pruning such as partition or file skipping. On heavily partitioned or file-pruned tables the extra IO can outweigh the scan-deduplication benefit. The asymmetric case ((np: Filter, cp)) is always beneficial because the unfiltered side would have read all the data anyway.Configs:
spark.sql.planMerge.filterPropagation.enabled(internal, defaulttrue): master switch; disabling it turns off all filter-based merging.spark.sql.planMerge.symmetricFilterPropagation.enabled(defaulttrue): controls the symmetric(Filter, Filter)case specifically, so users on IO-pruning-sensitive workloads can disable only that path while keeping the always-beneficial asymmetric merging.MergeResult.outputMapis changed fromAttributeMap[Attribute]toAttributeMap[Int], mapping each input plan attribute to its positional index in the merged output. Positional indices remain stable across subsequentPlanMerger.mergecalls (outputs are only ever appended), whereas retainedAttributevalues can become stale when filter merging replaces expressions with new aliases. This also simplifies the two call sites inMergeSubplans.Why are the changes needed?
Computing aggregates over the same table with different
WHEREclauses is a common analytical pattern (e.g. conditional sums or counts for different predicates). Without this change each subquery forces a separate full scan; merging them reduces scan count and overall query cost.Does this PR introduce any user-facing change?
Yes. A new config
spark.sql.planMerge.symmetricFilterPropagation.enabled(defaulttrue) is added. The optimization is otherwise transparent: queries produce the same results, and both configs can be set tofalseto restore the previous behavior.How was this patch tested?
New unit tests in
MergeSubplansSuiteand new end-to-end tests inPlanMergeSuitecovering the basic two-subplan cases, three-subplan merging, disabled configs, grouping aggregates (not merged), asymmetric filters, stacked filters, and reversed filter ordering.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Sonnet 4.6